﻿/**
* CRL
*/
using CRL.EventBus.Queue;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Linq;
using System.Collections;
using System.ComponentModel;
using System.Linq.Expressions;
using System.Threading;
using System.Reflection;

namespace CRL.EventBus
{
    public class PublishOption
    {
        public int DelayMs { get; set; }
        public int Priority { get; set; }
        public string MsgKey { get; set; }
        /// <summary>
        /// �ӳ�������DB��
        /// </summary>
        public int DelaySecond { get; set; }
    }
    public interface IPublisher
    {
        IPublisher Publish<T>(string name, T msg, Action<PublishOption> optFunc = null);
        IPublisher BatchPublish<T>(string name, IEnumerable<T> msgs, Action<PublishOption> optFunc = null);
        Task<IPublisher> PublishAsync<T>(string name, T msg, Action<PublishOption> optFunc = null);
        Task<IPublisher> BatchPublishAsync<T>(string name, IEnumerable<T> msgs, Action<PublishOption> optFunc = null);
        TResponse Request<TResponse>(string name, object msg, int timeOutMs = 1000);
        /// <summary>
        /// ��ǰqueue
        /// </summary>
        AbsQueue Queue { get; }
    }

    public class Publisher : AbsPublisher
    {
#if NETSTANDARD 
        public Publisher(Microsoft.Extensions.Options.IOptions<QueueConfig> options, SubscribeService _subService) : base(options.Value)
        {
            queue = QueueFactory.CreateClient(options.Value.GetMqSetting(MQType.None), false);
            subService = _subService;
        }
#endif
        public Publisher(QueueConfig _queueConfig) : base(_queueConfig)
        {
            queue = QueueFactory.CreateClient(_queueConfig.GetMqSetting(MQType.None), false);
            subService = SubscribeService._instance;
        }
        public Publisher(QueueConfig _queueConfig, MQType mQType) : base(_queueConfig)
        {
            queue = QueueFactory.CreateClient(_queueConfig.GetMqSetting(mQType), false);
            subService = SubscribeService._instance;
        }
    }
    public class AbsPublisher : IPublisher, IDisposable
    {
        protected SubscribeService subService;
        protected AbsQueue queue;
        protected QueueConfig queueConfig;
        public AbsPublisher(QueueConfig _queueConfig)
        {
            queueConfig = _queueConfig;
        }
        public void Dispose()
        {
            queue.Dispose();
        }
        public AbsQueue Queue => queue;
        public IPublisher Publish<T>(string name, T msg, Action<PublishOption> optFunc = null)
        {
            if (msg.GetType() != typeof(string) && msg is IEnumerable)
            {
                throw new Exception("msg����ΪIEnumerable");
            }
            queue.PublishList(queueConfig.getEventName(name), new List<object> { msg }, optFunc);
            return this;
        }
        public async Task<IPublisher> PublishAsync<T>(string name, T msg, Action<PublishOption> optFunc = null)
        {
            if (msg.GetType() != typeof(string) && msg is IEnumerable)
            {
                throw new Exception("msg����ΪIEnumerable");
            }
            await queue.PublishListAsync(queueConfig.getEventName(name), new List<object> { msg }, optFunc);
            return this;
        }
        public IPublisher BatchPublish<T>(string name, IEnumerable<T> msgs, Action<PublishOption> optFunc = null)
        {
            if(!msgs.Any())
            {
                return this;
            }
            queue.PublishList(queueConfig.getEventName(name), msgs.Select(b => (object)b), optFunc);
            return this;
        }
        public async Task<IPublisher> BatchPublishAsync<T>(string name, IEnumerable<T> msgs, Action<PublishOption> optFunc = null)
        {
            if (!msgs.Any())
            {
                return this;
            }
            await queue.PublishListAsync(queueConfig.getEventName(name), msgs.Select(b => (object)b), optFunc);
            return this;
        }

        public TResponse Request<TResponse>(string name, object msg, int timeOutMs = 1000)
        {
            if (subService == null)
            {
                throw new ArgumentNullException("subService");
            }
            TResponse result = default(TResponse);
            bool callbacked = false;
            var contextId = Guid.NewGuid().ToString();
            var autoResetEvent = new AutoResetEvent(false);
            void onCallBack(object sender, EventDeclare ed, object data, IData reqData)
            {
                if (ed.Name == name && contextId == reqData.MsgKey)
                {
                    result = (TResponse)data;
                    callbacked = true;
                    autoResetEvent.Set();
                    //DB����Ҫ�ֶ�ɾ��
                    if (sender is DbQueue)
                    {
                        ((DbQueue)sender).DeleteData(reqData);
                    }
                }
            }
            var callbackModel = callbackStatus.Get(name);
            if (callbackModel == null)
            {
                callbackModel = callbackStatus.Add(name);
                subService.AddSubscribe<TResponse>(new SubscribeAttribute { Name = $"{name}_callBack" }, b =>
                {
                    //Console.WriteLine($"�յ��ص� {name} {b}");
                    return true;
                }, queue, false);
            }
            callbackModel.onSubCallback += onCallBack;
            try
            {
                Publish(name, msg, opt =>
                {
                    opt.MsgKey = contextId;
                });
            }
            catch (Exception ero)
            {
                callbackModel.onSubCallback -= onCallBack;
                throw ero;
            }
            while (true)
            {
                autoResetEvent.WaitOne(timeOutMs);
                callbackModel.onSubCallback -= onCallBack;
                if (callbacked)
                {
                    //Console.WriteLine($"callback result {name} {result}");
                    return result;
                }
                //DB����Ҫ�ֶ�ɾ��
                if (queue is DbQueue)
                {
                    ((DbQueue)queue).DeleteMsg(contextId);
                }
                throw new TimeoutException($"{name} wait timeout at {timeOutMs}ms");
            }
        }
    }

    class callbackModel
    {
        public string name;
        internal event CallbackEventHandler onSubCallback;
        public callbackModel(string _name)
        {
            name = _name;
        }
        public void invoke(object sender, EventDeclare ed, object data, IData reqData)
        {
            onSubCallback?.Invoke(sender, ed, data, reqData);
        }
        public override string ToString()
        {
            return name;
        }
    }
    internal class callbackStatus
    {
        static Dictionary<string, callbackModel> status = new Dictionary<string, callbackModel>();
        public static callbackModel Add(string name)
        {
            var model = new callbackModel(name);
            status.Add(name, model);
            return model;
        }
        public static callbackModel Get(string name)
        {
            status.TryGetValue(name, out var m);
            return m;
        }
    }
}
